-
Notifications
You must be signed in to change notification settings - Fork 212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PROTON-1442: [Cpp] Support for local transactions #437
base: main
Are you sure you want to change the base?
Conversation
* Added an extra handler to the python binding so that we can handle transactioned dispositions
* Modified the Python example broker so that it understands transaction requests, prints some useful output about what is happening, but doesn't honor the transaction semantics. It will queue up transactioned messages immediately and also doesn't correctly handle outgoing message releases (but it doesn't for non-transactioned messages either
* Make it compile * Make it fit the existing software structure better
9c4c6d3
to
74818fc
Compare
74818fc
to
eb4514b
Compare
3e96834
to
fa9236d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've taken a fairly thorough look. If you have any questions about these review items we should discuss further.
cpp/include/proton/container.hpp
Outdated
@@ -312,6 +312,7 @@ class PN_CPP_CLASS_EXTERN container { | |||
/// Cancel task for the given work_handle. | |||
PN_CPP_EXTERN void cancel(work_handle); | |||
|
|||
PN_CPP_EXTERN transaction declare_transaction(proton::connection conn, proton::transaction_handler &handler, bool settle_before_discharge = false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you take a connection as the first parameter here it probably makes more sense to move this entire method into the proton::connection class.
Actually maybe this should be a method on session to match the cms api, in either case you can find the connection easily - as a result it would make sense to hold the transaction as state in the connection or session and only allow a single transaction at that level.
In this case it will be easy enough to find the transaction as you can always navigate upwards from the delivery to find its session and on to the connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the entire declare_transaction method to proton::session class.
PN_CPP_EXTERN target_options& type(const int); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be needed. Introduce a new coordinator class that is peer to sender and receiver
class transaction_impl { | ||
public: | ||
proton::sender txn_ctrl; | ||
proton::transaction_handler *handler = nullptr; | ||
proton::binary id; | ||
proton::tracker _declare; | ||
proton::tracker _discharge; | ||
bool failed = false; | ||
std::vector<proton::tracker> pending; | ||
|
||
void commit(); | ||
void abort(); | ||
void declare(); | ||
proton::tracker send(proton::sender s, proton::message msg); | ||
|
||
void discharge(bool failed); | ||
void release_pending(); | ||
void accept(delivery &d); | ||
void update(tracker &d, uint64_t state); | ||
void set_id(binary _id); | ||
|
||
proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value); | ||
void handle_outcome(proton::tracker t); | ||
transaction_impl(proton::sender &_txn_ctrl, | ||
proton::transaction_handler &_handler, | ||
bool _settle_before_discharge); | ||
|
||
// delete copy and assignment operator to ensure no copy of this object is | ||
// every made transaction_impl(const transaction_impl&) = delete; | ||
// transaction_impl& operator=(const transaction_impl&) = delete; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create an internal header file for this class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually as I said above this entire class definition can probably go into transaction.cpp, there may not need to be a separate declaration.
class | ||
PN_CPP_CLASS_EXTERN transaction_handler { | ||
public: | ||
PN_CPP_EXTERN virtual ~transaction_handler(); | ||
|
||
/// Called when a local transaction is declared. | ||
PN_CPP_EXTERN virtual void on_transaction_declared(transaction); | ||
|
||
/// Called when a local transaction is discharged successfully. | ||
PN_CPP_EXTERN virtual void on_transaction_committed(transaction); | ||
|
||
/// Called when a local transaction is discharged unsuccessfully (aborted). | ||
PN_CPP_EXTERN virtual void on_transaction_aborted(transaction); | ||
|
||
/// Called when a local transaction declare fails. | ||
PN_CPP_EXTERN virtual void on_transaction_declare_failed(transaction); | ||
|
||
/// Called when the commit of a local transaction fails. | ||
PN_CPP_EXTERN virtual void on_transaction_commit_failed(transaction); | ||
}; | ||
|
||
} // namespace proton | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be in a different header file like message and messaging_handler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if the API makes transactions hidden in a session then these callbacks either go away or instead become session callbacks: on_session_transaction_committed(session&), etc. I think transaction_declared() goes away entirely and it's purpose is now another use for on_session_open(session&). on_.._declare_failed should be handled by the on_session_error(session&).
// TODO: This should not be accessible to users. | ||
class transaction_impl { | ||
public: | ||
proton::sender txn_ctrl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is one per connection not per transaction. The transaction needs a way to find its conection (or session if it's per session) anyway so you should always be able to find the transaction controller
proton::tracker _declare; | ||
proton::tracker _discharge; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These must be exclusive as you cannot be delivering a declare and a discharge at the same time so I think you only need to keep track of a single outgoing delivery.
cpp/examples/tx_recv.cpp
Outdated
proton::transaction transaction; | ||
proton::connection connection; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't need to hold the connection as we should always be able to find the connection in a handler. This may be true for the transaction too - either we restrict the API to be one transaction per session or one transaction per connection. I think per session is porbably better as it matches the cms/jms api.
fa9236d
to
347cf6e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've not reviewed this comprehensively, but I think I'm now very much leaning towards not making the transaction class visible to the API user at all and having the transaction methods on the session. This is essentially the API in JMS and CMS.
|
||
class transaction_handler; | ||
|
||
// TODO: This should not be accessible to users. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the entirety of transaction_impl should be in transaction.cpp.
class transaction_impl { | ||
public: | ||
proton::sender txn_ctrl; | ||
proton::transaction_handler *handler = nullptr; | ||
proton::binary id; | ||
proton::tracker _declare; | ||
proton::tracker _discharge; | ||
bool failed = false; | ||
std::vector<proton::tracker> pending; | ||
|
||
void commit(); | ||
void abort(); | ||
void declare(); | ||
proton::tracker send(proton::sender s, proton::message msg); | ||
|
||
void discharge(bool failed); | ||
void release_pending(); | ||
void accept(delivery &d); | ||
void update(tracker &d, uint64_t state); | ||
void set_id(binary _id); | ||
|
||
proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value); | ||
void handle_outcome(proton::tracker t); | ||
transaction_impl(proton::sender &_txn_ctrl, | ||
proton::transaction_handler &_handler, | ||
bool _settle_before_discharge); | ||
|
||
// delete copy and assignment operator to ensure no copy of this object is | ||
// every made transaction_impl(const transaction_impl&) = delete; | ||
// transaction_impl& operator=(const transaction_impl&) = delete; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually as I said above this entire class definition can probably go into transaction.cpp, there may not need to be a separate declaration.
public: | ||
// TODO: | ||
// PN_CPP_EXTERN transaction(transaction &o); | ||
PN_CPP_EXTERN transaction(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking it through I don't think that the user should be able to make a transaction at all, so take this out of the interface. I think the only way to get a transaction should be as the result of declaring a transaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having said that I think a better overall API might be not to expose the transaction class to the API user at all and to have all the methods either on session or session::commit(), session::abort(), session::is_transactioned(). I think the way to do this without introducing a new transacted_session class would be to add a new session_option - something like bool transactioned() to make a transactioned session. Then in the transactioned case only call on_session_open when the transaction has been successfully declared. This would be instead of the transaction_handler::transaction_declared() callback.
Now any sender or receiver that is created from this session transparently will send or acknowledge in the transaction.
class | ||
PN_CPP_CLASS_EXTERN transaction_handler { | ||
public: | ||
PN_CPP_EXTERN virtual ~transaction_handler(); | ||
|
||
/// Called when a local transaction is declared. | ||
PN_CPP_EXTERN virtual void on_transaction_declared(transaction); | ||
|
||
/// Called when a local transaction is discharged successfully. | ||
PN_CPP_EXTERN virtual void on_transaction_committed(transaction); | ||
|
||
/// Called when a local transaction is discharged unsuccessfully (aborted). | ||
PN_CPP_EXTERN virtual void on_transaction_aborted(transaction); | ||
|
||
/// Called when a local transaction declare fails. | ||
PN_CPP_EXTERN virtual void on_transaction_declare_failed(transaction); | ||
|
||
/// Called when the commit of a local transaction fails. | ||
PN_CPP_EXTERN virtual void on_transaction_commit_failed(transaction); | ||
}; | ||
|
||
} // namespace proton | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if the API makes transactions hidden in a session then these callbacks either go away or instead become session callbacks: on_session_transaction_committed(session&), etc. I think transaction_declared() goes away entirely and it's purpose is now another use for on_session_open(session&). on_.._declare_failed should be handled by the on_session_error(session&).
PN_CPP_EXTERN void transaction(transaction t); | ||
|
||
PN_CPP_EXTERN class transaction transaction() const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these are not use visible. They are only used internally in the implementation - especially if the transaction is hidden inside the session.
send(s); | ||
} | ||
|
||
void send(proton::sender &s) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sender is defined as tx_recv class attribute, so I believe we don't need to pass the sender to send method.
transaction.accept(d); | ||
current_batch += 1; | ||
if(current_batch == batch_size) { | ||
transaction = proton::transaction(); // null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we should do rather commit here, that way it works as expected and the receiver is closed after expected number of messages received (with the current implementation it's not).
|
||
int main(int argc, char **argv) { | ||
std::string address("127.0.0.1:5672/examples"); | ||
int message_count = 9; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect the defaults to be the same for tx_send and tx_recv (currently tx_send defaults to 6 messages while tx_recv to 9)
PROTON-1442
AMQP Transaction Sequence:
Client establishes link to Broker (Transaction resource) to target with transaction coordinator type (usual types are sender/receiver) (ATTACH frame)
Client (Transaction Controller) sends a special message to that link to create transaction (TRANSFER frame)
Broker returns a disposition with the transaction id (DISPOSITION frame)